-
Notifications
You must be signed in to change notification settings - Fork 90
/
AMQPConnection.m
103 lines (82 loc) · 2.83 KB
/
AMQPConnection.m
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
//
// AMQPConnection.m
// This file is part of librabbitmq-objc.
// Copyright (C) 2014 *Prof. MAAD* aka Max Wolter
// librabbitmq-objc is released under the terms of the GNU Lesser General Public License Version 3.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
#import "AMQPConnection.h"
# import <amqp.h>
# import <amqp_framing.h>
# import <unistd.h>
# import "AMQPChannel.h"
@implementation AMQPConnection
@synthesize internalConnection = connection;
- (id)init
{
if(self = [super init])
{
connection = amqp_new_connection();
nextChannel = 1;
}
return self;
}
- (void)dealloc
{
[self disconnect];
amqp_destroy_connection(connection);
[super dealloc];
}
- (void)connectToHost:(NSString*)host onPort:(int)port
{
socketFD = amqp_open_socket([host UTF8String], port);
if(socketFD < 0)
{
[NSException raise:@"AMQPConnectionException" format:@"Unable to open socket to host %@ on port %d", host, port];
}
amqp_set_sockfd(connection, socketFD);
}
- (void)loginAsUser:(NSString*)username withPasswort:(NSString*)password onVHost:(NSString*)vhost
{
amqp_rpc_reply_t reply = amqp_login(connection, [vhost UTF8String], 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, [username UTF8String], [password UTF8String]);
if(reply.reply_type != AMQP_RESPONSE_NORMAL)
{
[NSException raise:@"AMQPLoginException" format:@"Failed to login to server as user %@ on vhost %@ using password %@: %@", username, vhost, password, [self errorDescriptionForReply:reply]];
}
}
- (void)disconnect
{
amqp_rpc_reply_t reply = amqp_connection_close(connection, AMQP_REPLY_SUCCESS);
if(reply.reply_type != AMQP_RESPONSE_NORMAL)
{
[NSException raise:@"AMQPConnectionException" format:@"Unable to disconnect from host: %@", [self errorDescriptionForReply:reply]];
}
close(socketFD);
}
- (void)checkLastOperation:(NSString*)context
{
amqp_rpc_reply_t reply = amqp_get_rpc_reply(connection);
if(reply.reply_type != AMQP_RESPONSE_NORMAL)
{
[NSException raise:@"AMQPException" format:@"%@: %@", context, [self errorDescriptionForReply:reply]];
}
}
- (AMQPChannel*)openChannel
{
AMQPChannel *channel = [[AMQPChannel alloc] init];
[channel openChannel:nextChannel onConnection:self];
nextChannel++;
return [channel autorelease];
}
@end